AWS CDK v2.87.0 で IoT rule actions に Step Functions State Machine の開始を追加可能になりました
こんにちは、CX事業本部 Delivery部の若槻です。
AWS IoT rule actions を使用すると、指定したトピックにデータがパブリッシュされた際に、Lambda 関数の呼び出しや Amazon Kinesis Data Streams へのデータの送信などのアクションを定義することができます。
そして最近リリースされた AWS CDK v2.87.0 で IoT rule actions として、Step Functions State Machine の開始を追加可能になりました
アップデートされたのは Alpha modules となります。
Alpha modules (2.87.0-alpha.0)
Features
このアップデートにより、トピックにデータがパブリッシュされた際に、ETL パイプラインや分散処理のオーケストレーション実行を簡単に実装することが可能となります。
試してみた
必要なモジュールのインストール
IoT rule actions の CDK 実装は現在 Alpha module で提供されているので、モジュールを別途インストールします。
npm install -D @aws-cdk/aws-iot-alpha @aws-cdk/aws-iot-actions-alpha
インストールしたモジュールは下記となります。
CDK コード
AWS CDK(TypeScript)のコードです。
@aws-cdk/aws-iot-actions-alpha
クラスで StepFunctionsStateMachineAction
を使用して、TopicRule の actions および errorAction に State Machine を指定可能となっています。それぞれ異なる State Machine を指定しています。
import { aws_stepfunctions as stepfunctions, Stack, StackProps, } from 'aws-cdk-lib'; import { Construct } from 'constructs'; import * as aws_iot_alpha from '@aws-cdk/aws-iot-alpha'; import * as aws_iot_actions_alpha from '@aws-cdk/aws-iot-actions-alpha'; export class CdkSampleStack extends Stack { constructor(scope: Construct, id: string, props: StackProps) { super(scope, id, props); const myStateMachine = new stepfunctions.StateMachine( this, 'MyStateMachine', { stateMachineName: 'myStateMachine', definitionBody: stepfunctions.DefinitionBody.fromChainable( new stepfunctions.Pass(this, 'MyPassState', { parameters: { message: stepfunctions.JsonPath.stringAt('$.message'), }, }) ), } ); const myErrorStateMachine = new stepfunctions.StateMachine( this, 'MyErrorStateMachine', { stateMachineName: 'myErrorStateMachine', definitionBody: stepfunctions.DefinitionBody.fromChainable( new stepfunctions.Pass(this, 'ErrorState', { parameters: { ruleName: stepfunctions.JsonPath.stringAt('$.ruleName'), topic: stepfunctions.JsonPath.stringAt('$.topic'), failedResource: stepfunctions.JsonPath.stringAt( '$.failures[0].failedResource' ), }, }) ), } ); new aws_iot_alpha.TopicRule(this, 'MyTopicRule', { topicRuleName: 'myTopicRule', sql: aws_iot_alpha.IotSql.fromStringAsVer20160323( "SELECT * FROM 'topic/subtopic'" ), actions: [ new aws_iot_actions_alpha.StepFunctionsStateMachineAction(myStateMachine), ], errorAction: new aws_iot_actions_alpha.StepFunctionsStateMachineAction( myErrorStateMachine ), }); } }
上記定義により作成されるリソースは次のようになります。TopicRule の実体のリソースと、IoT action を実行するための権限リソースが作成されています。
動作確認
アクションの実行
トピックルールの SQL で指定したトピック topic/subtopic
に正常な JSON データをパブリッシュしみます。
aws iot-data publish \ --topic 'topic/subtopic' \ --cli-binary-format raw-in-base64-out \ --payload '{"message":"hello!"}'
myStateMachine の直近の実行履歴を取得します。
$ executionArn=$(aws stepfunctions list-executions \ --state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myStateMachine \ --max-items 1 | jq -r '.executions[0].executionArn')
直近の実行内容を確認すると、トピックルールから myStateMachine にデータが渡されて実行が行われたことが分かります。
$ aws stepfunctions describe-execution \ --execution-arn ${executionArn} { "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myStateMachine:268cc458-5ae6-490d-bc88-8b63d0ff3c3b", "stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myStateMachine", "name": "268cc458-5ae6-490d-bc88-8b63d0ff3c3b", "status": "SUCCEEDED", "startDate": "2023-07-16T21:31:04.089000+09:00", "stopDate": "2023-07-16T21:31:04.154000+09:00", "input": "{\"message\":\"hello!\"}", "inputDetails": { "included": true }, "output": "{\"message\":\"hello!\"}", "outputDetails": { "included": true } }
エラーアクションの実行
同じくトピック topic/subtopic
に、今度は不正な JSON データをパブリッシュしみます。
aws iot-data publish \ --topic 'topic/subtopic' \ --cli-binary-format raw-in-base64-out \ --payload 'invalid json'
myErrorStateMachine の直近の実行履歴を取得します。
$ executionArn=$(aws stepfunctions list-executions \ --state-machine-arn arn:aws:states:${region}:${account_id}:stateMachine:myErrorStateMachine \ --max-items 1 | jq -r '.executions[0].executionArn')
すると、トピックルールから myErrorStateMachine にトピックルールでのエラーの情報がデータとして渡されて実行が行われたことが分かります。
$ aws stepfunctions describe-execution \ --execution-arn ${executionArn} { "executionArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:execution:myErrorStateMachine:47974352-9eb8-4a20-a368-423a8967b558", "stateMachineArn": "arn:aws:states:ap-northeast-1:XXXXXXXXXXXXX:stateMachine:myErrorStateMachine", "name": "47974352-9eb8-4a20-a368-423a8967b558", "status": "SUCCEEDED", "startDate": "2023-07-16T21:31:31.440000+09:00", "stopDate": "2023-07-16T21:31:31.515000+09:00", "input": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"cloudwatchTraceId\":\"49fc139d-31dc-d44d-9dc4-c738a11c1b41\",\"clientId\":\"N/A\",\"base64OriginalPayload\":\"aW52YWxpZCBqc29u\",\"failures\":[{\"failedAction\":\"StepFunctionsAction\",\"failedResource\":\"myStateMachine\",\"errorMessage\":\"Failed to start Step Functions execution. The error received was 'Input must be a valid JSON string' . Message arrived on: topic/subtopic, Action: stepFunctions, StateMachineName: myStateMachine, ExecutionName: adf42997-4f10-47aa-9fdf-9018294f6737\"}]}", "inputDetails": { "included": true }, "output": "{\"ruleName\":\"myTopicRule\",\"topic\":\"topic/subtopic\",\"failedResource\":\"myStateMachine\"}", "outputDetails": { "included": true } }
今回のアップデートについて
今回、「Step Functions State Machine の開始」が追加されましたが、これにより@aws-cdk/aws-iot-actions-alpha
では現在合わせて14種類の IoT rule actions が利用可能になっています。
export * from './cloudwatch-logs-action'; export * from './cloudwatch-put-metric-action'; export * from './cloudwatch-set-alarm-state-action'; export * from './common-action-props'; export * from './dynamodbv2-put-item-action'; export * from './firehose-put-record-action'; export * from './iotevents-put-message-action'; export * from './iot-republish-action'; export * from './kinesis-put-record-action'; export * from './lambda-function-action'; export * from './s3-put-object-action'; export * from './sqs-queue-action'; export * from './sns-topic-action'; + export * from './step-functions-state-machine-action';
選択肢が多いのは嬉しいですね。
おわりに
AWS CDK v2.87.0 で IoT rule actions に Step Functions State Machine の開始を追加可能になりました。
AWS CDK での IoT 周りの実装はまだまだ手薄なので、今後もアップデートが続くことを期待しています。
参考
以上